package cn.sheep.violet.report

import cn.sheep.violet.bean.RptArea
import cn.sheep.violet.config.ConfigHandler
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}

/** 地域分布统计 - Core实现方式
  * author: old sheep
  * QQ: 64341393 
  * Created 2018/10/16
  */
object AreaAnalysisCore {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf()
        sparkConf.setAppName("地域分布统计-core")
        sparkConf.setMaster("local[*]")
        // 设置spark程序采用的序列化方式
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(sparkConf)
        val sQLContext = new SQLContext(sc)

        // 读取数据
        val dataFrame = sQLContext.read.parquet(ConfigHandler.parquetFilePath)

        // 按照需求进行数据分析统计
        import sQLContext.implicits._
        /**
          * 维度： 省市
          * 指标字段：
          */
        dataFrame.map(row => {

            // 提取维度字段
            val provinceName = row.getAs[String]("provincename")
            val cityName = row.getAs[String]("cityname")

            // 提取个指标相关的字段
            val reqMode = row.getAs[Int]("requestmode")
            val proNode = row.getAs[Int]("processnode")
            val effTive = row.getAs[Int]("iseffective")
            val billing = row.getAs[Int]("isbilling")
            val isbid = row.getAs[Int]("isbid")
            val iswin = row.getAs[Int]("iswin")
            val adOrderId = row.getAs[Int]("adorderid")

            val rawReq = if (reqMode == 1 && proNode >= 1) 1 else 0
            val effReq = if (reqMode == 1 && proNode >= 2) 1 else 0
            val adReq = if (reqMode == 1 && proNode == 3) 1 else 0

            val adRtbReq = if (effTive == 1 && billing == 1 && isbid == 1 && adOrderId != 0) 1 else 0
            val adSuccRtbAndCostAndConsumption = if (effTive == 1 && billing == 1 && iswin == 1) {
                val winPrice = row.getAs[Double]("winprice")
                val adPayment = row.getAs[Double]("adpayment")

                (1, adPayment / 1000, winPrice / 1000)

            } else (0, 0d, 0d)

            val adShow = if (reqMode == 2 && effTive == 1) 1 else 0
            val adClick = if (reqMode == 3 && effTive == 1) 1 else 0

            // 返回一个对偶元组
            ((provinceName, cityName), List(rawReq, effReq, adReq, adRtbReq, adSuccRtbAndCostAndConsumption._1, adShow, adClick, adSuccRtbAndCostAndConsumption._2, adSuccRtbAndCostAndConsumption._3))
        }).reduceByKey((a, b) => a.zip(b).map(t => t._1 + t._2))
//            .reduceByKey((a, b) => (
//            a._1 + b._1,
//            a._2 + b._2,
//            a._3 + b._3,
//            a._4 + b._4,
//            a._5 + b._5,
//            a._6 + b._6,
//            a._7 + b._7,
//            a._8 + b._8,
//            a._9 + b._9
//        ))
         .map(tp => RptArea(
            tp._1._1,
            tp._1._2,
            tp._2(0).toInt,
            tp._2(1).toInt,
            tp._2(2).toInt,
            tp._2(3).toInt,
            tp._2(4).toInt,
            tp._2(5).toInt,
            tp._2(6).toInt,
            tp._2(7),
            tp._2(8)
        )).toDF().write.mode(SaveMode.Overwrite).jdbc(ConfigHandler.url, "orc_report_area", ConfigHandler.props)

        sc.stop()
    }

}
